package org.luckyjourney.service.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.luckyjourney.constant.RedisConstant;
import org.luckyjourney.entity.video.Video;
import org.luckyjourney.entity.user.User;
import org.luckyjourney.entity.vo.HotVideo;
import org.luckyjourney.entity.vo.Model;
import org.luckyjourney.entity.vo.UserModel;
import org.luckyjourney.service.InterestPushService;
import org.luckyjourney.service.video.TypeService;
import org.luckyjourney.util.RedisCacheUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * @description:
 * @Author: Xhy
 * @CreateTime: 2023-10-26 11:54
 */
// 暂时为异步
@Service
public class InterestPushServiceImpl implements InterestPushService {

    @Autowired
    private RedisCacheUtil redisCacheUtil;

    @Autowired
    private TypeService typeService;

    @Autowired
    private RedisTemplate redisTemplate;


    final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    @Async
    public void pushSystemStockIn(Video video) {
        // 往系统库中添加
        final List<String> labels = video.buildLabel();
        final Long videoId = video.getId();
//        TODO 每个视频标签通过流处理的方式都遍历到redis的标签集合里，数据类型（标签，视频id）
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            for (String label : labels) {
                connection.sAdd((RedisConstant.SYSTEM_STOCK + label).getBytes(),String.valueOf(videoId).getBytes());
            }
            return null;
        });
    }

    @Override
    @Async
    public void pushSystemTypeStockIn(Video video) {
        final Long typeId = video.getTypeId();
//        TODO 类型（分类，视频id）
        redisCacheUtil.sSet(RedisConstant.SYSTEM_TYPE_STOCK + typeId,video.getId());
    }

    @Override
//    TODO 当进入一个类型视频页面时，会根据类型的id随机推送12个视频的id
    public Collection<Long> listVideoIdByTypeId(Long typeId) {
        // 随机推送12个视频

        final List<Object> list = redisTemplate.opsForSet().randomMembers(RedisConstant.SYSTEM_TYPE_STOCK + typeId, 12);
        //TODO 可能会有null，把随机出来的视频id遍历到结果集中
        final HashSet<Long> result = new HashSet<>();
        for (Object aLong : list) {
            if (aLong!=null){
                result.add(Long.parseLong(aLong.toString()));
            }
        }
        return result;
    }

    @Override
    @Async
    public void deleteSystemStockIn(Video video) {
//        TODO  删除此标签内视频的redis缓存，使用了管道流来优化内存
        final List<String> labels = video.buildLabel();
        final Long videoId = video.getId();
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            for (String label : labels) {
                connection.sRem((RedisConstant.SYSTEM_STOCK + label).getBytes(),String.valueOf(videoId).getBytes());
            }
            return null;
        });
    }

    @Override
    @Async
    public void initUserModel(Long userId, List<String> labels) {
//      TODO 获取该用户感兴趣的标签集合，加入新的标签集合并通过百分制等分
        final String key = RedisConstant.USER_MODEL + userId;
        Map<Object, Object> modelMap = new HashMap<>();
        if (!ObjectUtils.isEmpty(labels)) {
            final int size = labels.size();
            // 将标签分为等分概率,不可能超过100个分类
            double probabilityValue = 100 / size;
//            TODO 遍历给各个标签加上概率
            for (String labelName : labels) {
                modelMap.put(labelName, probabilityValue);
            }
        }
//        TODO 重置标签存入缓存
        redisCacheUtil.del(key);
        redisCacheUtil.hmset(key, modelMap);
        // 为用户模型设置ttl

    }


    @Override
    @Async
    public void updateUserModel(UserModel userModel) {

        final Long userId = userModel.getUserId();
        // 游客不用管
//        TODO 已经登录了（id不为空），开始执行修改模型值
        if (userId != null) {
//            TODO 提取出数据模型，数据类型：模型，分值，视频id
            final List<Model> models = userModel.getModels();
            //TODO 从redis中获取用户模型，hash{label,score}
            String key = RedisConstant.USER_MODEL + userId;
            Map<Object, Object> modelMap = redisCacheUtil.hmget(key);
//          TODO 如果模型为空，返回空值
            if (modelMap == null) {
                modelMap = new HashMap<>();
            }
//            TODO 把模型数据库里的每一条遍历出来
            for (Model model : models) {
                //TODO 如果modelMap有类型key,修改用户模型，
                if (modelMap.containsKey(model.getLabel())) {
//                    TODO redis中存kv(类型，增加后的分数)
                    modelMap.put(model.getLabel(), Double.parseDouble(modelMap.get(model.getLabel()).toString()) + model.getScore());
//                    TODO 定义分数，分数为空或大于0，移除这个KV
                    final Object o = modelMap.get(model.getLabel());
                    if (o == null || Double.parseDouble(o.toString()) > 0.0){
                        modelMap.remove(o);
                    }
                } else {
                    modelMap.put(model.getLabel(), model.getScore());
                }
            }

            // 每个标签概率同等加上标签数，再同等除以标签数  防止数据膨胀
            final int labelSize = modelMap.keySet().size();
            for (Object o : modelMap.keySet()) {
                modelMap.put(o,(Double.parseDouble(modelMap.get(o).toString()) + labelSize )/ labelSize);
            }
            // 更新用户模型
            redisCacheUtil.hmset(key, modelMap);
        }

    }

    @Override
    public Collection<Long> listVideoIdByUserModel(User user) {
//        TODO 10位的视频id
        Set<Long> videoIds = new HashSet<>(10);

        if (user != null) {
            final Long userId = user.getId();
            // TODO 从redis种获取到用户的模型对象，包含很多类型和分数
            final Map<Object, Object> modelMap = redisCacheUtil.hmget(RedisConstant.USER_MODEL + userId);
            if (!ObjectUtils.isEmpty(modelMap)) {
                // TODO 还是原来的数据，只是分数经过了算法有所变化
                final String[] probabilityArray = initProbabilityArray(modelMap);
                final Boolean sex = user.getSex();
                // 获取视频
                final Random randomObject = new Random();
                final ArrayList<String> labelNames = new ArrayList<>();
                // TODO 随机获取8个视频，把模型数据里的分类，随机了8个分类，加到了分类数组里
                for (int i = 0; i < 8; i++) {
                    String labelName = probabilityArray[randomObject.nextInt(probabilityArray.length)];
                    labelNames.add(labelName);
                }
                // 提升性能
                String t = RedisConstant.SYSTEM_STOCK;
                // 随机获取
                List<Object> list = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
                    for (String labelName : labelNames) {
                        String key = t + labelName;
                        connection.sRandMember(key.getBytes());
                    }
                    return null;
                });
                // TODO 从list里获取到的videoIds
                Set<Long> ids = list.stream().filter(id->id!=null).map(id->Long.parseLong(id.toString())).collect(Collectors.toSet());
                String key2 = RedisConstant.HISTORY_VIDEO;

                // 去重
                List simpIds = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
                    for (Long id : ids) {
                        connection.get((key2 + id + ":" + userId).getBytes());
                    }
                    return null;
                });
                simpIds = (List) simpIds.stream().filter(o->!ObjectUtils.isEmpty(o)).collect(Collectors.toList());;
                if (!ObjectUtils.isEmpty(simpIds)){
                    for (Object simpId : simpIds) {
                        final Long l = Long.valueOf(simpId.toString());
                        if (ids.contains(l)){
                            ids.remove(l);
                        }
                    }
                }


                videoIds.addAll(ids);

                // 随机挑选一个视频,根据性别: 男：美女 女：宠物
                final Long aLong = randomVideoId(sex);
                if (aLong!=null){
                    videoIds.add(aLong);
                }


                return videoIds;
            }
        }
        // 游客
        // 随机获取10个标签
        // TODO 获取不超过标签数的随机数，遍历出10个随机标签
        final List<String> labels = typeService.random10Labels();
        final ArrayList<String> labelNames = new ArrayList<>();
        int size = labels.size();
        final Random random = new Random();
        // 获取随机的标签
        for (int i = 0; i < 10; i++) {
            final int randomIndex = random.nextInt(size);
//            TODO 把随机出的标签名传给标签集合
            labelNames.add(RedisConstant.SYSTEM_STOCK + labels.get(randomIndex));
        }
        // 获取videoId
        final List<Object> list = redisCacheUtil.sRandom(labelNames);
        if (!ObjectUtils.isEmpty(list)){
            videoIds = list.stream().filter(id ->!ObjectUtils.isEmpty(id)).map(id -> Long.valueOf(id.toString())).collect(Collectors.toSet());
        }

        return videoIds;
    }

    @Override
    public Collection<Long> listVideoIdByLabels(List<String> labelNames) {
//        TODO 传来标签名集合，返回视频id
        final ArrayList<String> labelKeys = new ArrayList<>();
        for (String labelName : labelNames) {
//            TODO 往labelKeys里填充标签里的视频id
            labelKeys.add(RedisConstant.SYSTEM_STOCK + labelName);
        }
        Set<Long> videoIds = new HashSet<>();
//        TODO 从这些标签里随机几个标签，把list 集合里的随机标签传给视频id
        final List<Object> list = redisCacheUtil.sRandom(labelKeys);
        if (!ObjectUtils.isEmpty(list)){
            videoIds = list.stream().filter(id ->!ObjectUtils.isEmpty(id)).map(id -> Long.valueOf(id.toString())).collect(Collectors.toSet());
        }
        return videoIds;
    }

    @Override
    @Async
    public void deleteSystemTypeStockIn(Video video) {
        final Long typeId = video.getTypeId();
        redisCacheUtil.setRemove(RedisConstant.SYSTEM_TYPE_STOCK + typeId,video.getId());
    }


    public Long randomHotVideoId(){
        final Object o = redisTemplate.opsForZSet().randomMember(RedisConstant.HOT_RANK);
        try {
            return objectMapper.readValue(o.toString(),HotVideo.class).getVideoId();
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public Long randomVideoId(Boolean sex) {
        String key = RedisConstant.SYSTEM_STOCK + (sex ? "美女" : "宠物");
        final Object o = redisCacheUtil.sRandom(key);
        if (o!=null){
            return Long.parseLong(o.toString());
        }
        return null;
    }

    // 随机获取视频id
    public Long getVideoId(Random random, String[] probabilityArray) {
        String labelName = probabilityArray[random.nextInt(probabilityArray.length)];
        // 获取对应所有视频
        String key = RedisConstant.SYSTEM_STOCK + labelName;
        final Object o = redisCacheUtil.sRandom(key);
        if (o!=null){
            return Long.parseLong(o.toString()) ;
        }
        return null;
    }

    // 初始化概率数组 -> 保存的元素是标签
    public String[] initProbabilityArray(Map<Object, Object> modelMap) {
        // key: 标签  value：概率
        Map<String, Integer> probabilityMap = new HashMap<>();
        int size = modelMap.size();
        final AtomicInteger n = new AtomicInteger(0);
//        TODO 把模型里的所有对象重新计算录入到probabilityMap集合里
        modelMap.forEach((k, v) -> {
            // 防止结果为0,每个同等加上标签数
            int probability = (((Double) v).intValue() + size) / size;
            n.getAndAdd(probability);
            probabilityMap.put(k.toString(), probability);
        });
        final String[] probabilityArray = new String[n.get()];

        final AtomicInteger index = new AtomicInteger(0);
        // 初始化数组
        probabilityMap.forEach((labelsId, p) -> {
            int i = index.get();
            int limit = i + p;
            while (i < limit) {
                probabilityArray[i++] = labelsId;
            }
            index.set(limit);
        });
        return probabilityArray;
    }




}
